Logistic Regression Example with MLlib and Spark ML

Start up Spark

Spark ML

Read training and test data. In this case test data is labeled as well (we will generate our label based on the arrdelay field)


In [ ]:
training = sqlContext.read.parquet("s3://zoltanctoth-flights/training.parquet")
test = sqlContext.read.parquet("s3://zoltanctoth-flights/training.parquet")

test.printSchema()

In [ ]:
test.first()

In [ ]:
training.cache()
test.cache()

Generate label column for the training data


In [ ]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf

is_late = udf(lambda delay: 1.0 if delay > 0 else 0.0, DoubleType())
training = training.withColumn("is_late",is_late(training.arrdelay))

Create and fit Spark ML model


In [ ]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

# Create feature vectors. Ignore arr_delay and it's derivate, is_late
feature_assembler = VectorAssembler(
    inputCols=[x for x in training.columns if x not in ["is_late","arrdelay"]],
    outputCol="features")

reg = LogisticRegression().setParams(
    maxIter = 100,
    labelCol="is_late",
    predictionCol="prediction")

model = Pipeline(stages=[feature_assembler, reg]).fit(training)

Predict whether the aircraft will be late


In [ ]:
predicted = model.transform(test)

In [ ]:
predicted.take(1)

Check model performance


In [ ]:
predicted = predicted.withColumn("is_late",is_late(predicted.arrdelay))
predicted.crosstab("is_late","prediction").show()

In [ ]: